/*
 * Copyright (c) 2009, Michael van der Westhuizen
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *    - Redistributions of source code must retain the above copyright
 *      notice, this list of conditions and the following disclaimer.
 *    - Redistributions in binary form must reproduce the above
 *      copyright notice, this list of conditions and the following
 *      disclaimer in the documentation and/or other materials provided
 *      with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */
#ifndef MQSESSION_MCVDW_20090517_C
#define MQSESSION_MCVDW_20090517_C

#include "imq.h"

#include "mqstatus.c"
#include "util.c"
#include "mqdestination.c"
#include "mqmessage.c"
#include "mqproducer.c"
#include "mqconsumer-callbacks.c"
#include "mqconsumer.c"



static int mqcrt_MQSession_close_session(mqcrt_MQSession *self, int raise_errors)
{
    MQStatus status;
    MQSessionHandle handle;

    if (self->closed) {
        return 1;
    }

    self->closed = 1;
    COPY_HANDLE(handle, self->session);
    INIT_HANDLE(self->session);

    if (HANDLE_IS_VALID(handle)) {
        status = MQCloseSession(handle);

        if (raise_errors) {
            if (setExceptionFromStatus(status)) {
                COPY_HANDLE(self->session, handle);
                self->closed = 0;
                return 0;
            }
        }
    }

    return 1;
}


static int mqcrt_MQSession_traverse(mqcrt_MQSession *self, visitproc visit, void *arg)
{
    DPRINTF(("%s\n", "INFO: mqcrt_MQSession_traverse: entry"));
    Py_VISIT(self->connection);
    DPRINTF(("%s\n", "INFO: mqcrt_MQSession_traverse: exit"));
    return 0;
}


static int mqcrt_MQSession_clear(mqcrt_MQSession *self)
{
    DPRINTF(("%s\n", "INFO: mqcrt_MQSession_clear: entry"));
    Py_CLEAR(self->connection);
    DPRINTF(("%s\n", "INFO: mqcrt_MQSession_clear: exit"));
    return 0;
}


static void mqcrt_MQSession_dealloc(mqcrt_MQSession *self)
{
    DPRINTF(("%s\n", "INFO: mqcrt_MQSession_dealloc: entry"));
    mqcrt_MQSession_close_session(self, 0);
    mqcrt_MQSession_clear(self);
    self->ob_type->tp_free((PyObject*)self);
    DPRINTF(("%s\n", "INFO: mqcrt_MQSession_dealloc: exit"));
}


static PyObject * mqcrt_MQSession_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
    mqcrt_MQSession * self;

    DPRINTF(("%s\n", "INFO: mqcrt_MQSession_new: entry"));
    if (NULL == (self = (mqcrt_MQSession *)type->tp_alloc(type, 0))) {
        return NULL;
    }

    INIT_HANDLE(self->session);
    self->connection = NULL;
    self->closed = 1;
    DPRINTF(("%s\n", "INFO: mqcrt_MQSession_new: exit"));
    return (PyObject *)self;
}


static PyObject * mqcrt_MQSession_close(mqcrt_MQSession *self, PyObject *args, PyObject *kwds)
{
    if (!mqcrt_MQSession_close_session(self, 1)) {
        return NULL;
    }

    Py_RETURN_NONE;
}


static PyObject * mqcrt_MQSession_create_destination(mqcrt_MQSession *self, PyObject *args, PyObject *kwds)
{
    PyObject * name;
    MQDestinationType type;
    MQDestinationHandle handle;
    PyObject * obj;

    static char *kwlist[] = {"name", "type", NULL};

    INIT_HANDLE(handle);

    if (!PyArg_ParseTupleAndKeywords(args, kwds, "Oi", kwlist, &name, &type)) {
        return NULL;
    }

    if (NULL == (name = coerceObjectToUtf8String(name))) {
        return NULL;
    }

    if (setExceptionFromStatus(MQCreateDestination(self->session, PyString_AsString(name), type, &handle))) {
        Py_CLEAR(name);
        return NULL;
    }

    Py_CLEAR(name);

    if (NULL == (obj = mqcrt_MQDestination_Factory(handle, self))) {
        MQFreeDestination(handle);
        return NULL;
    }

    return obj;
}


static PyObject * mqcrt_MQSession_create_temporary_destination(mqcrt_MQSession *self, PyObject *args, PyObject *kwds)
{
    MQDestinationType type;
    MQDestinationHandle handle;
    PyObject * obj;

    static char *kwlist[] = {"type", NULL};

    INIT_HANDLE(handle);

    if (!PyArg_ParseTupleAndKeywords(args, kwds, "i", kwlist, &type)) {
        return NULL;
    }

    if (setExceptionFromStatus(MQCreateTemporaryDestination(self->session, type, &handle))) {
        return NULL;
    }

    if (NULL == (obj = mqcrt_MQDestination_Factory(handle, self))) {
        MQFreeDestination(handle);
        return NULL;
    }

    return obj;
}


static PyObject * mqcrt_MQSession_create_producer_for_destination(mqcrt_MQSession *self, PyObject *args, PyObject *kwds)
{
    mqcrt_MQDestination * dst;
    PyObject * obj;
    MQProducerHandle handle;

    static char *kwlist[] = {"destination", NULL};

    INIT_HANDLE(handle);

    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O", kwlist, &dst)) {
        return NULL;
    }

    if (setExceptionFromStatus(MQCreateMessageProducerForDestination(self->session, dst->destination, &handle))) {
        return NULL;
    }

    if (NULL == (obj = mqcrt_MQProducer_Factory(handle, self))) {
        MQCloseMessageProducer(handle);
        return NULL;
    }

    return obj;
}


static PyObject * mqcrt_MQSession_create_producer(mqcrt_MQSession *self, PyObject *args, PyObject *kwds)
{
    PyObject * obj;
    MQProducerHandle handle;

    INIT_HANDLE(handle);

    if (setExceptionFromStatus(MQCreateMessageProducer(self->session, &handle))) {
        return NULL;
    }

    if (NULL == (obj = mqcrt_MQProducer_Factory(handle, self))) {
        MQCloseMessageProducer(handle);
        return NULL;
    }

    return obj;
}


static PyObject * mqcrt_MQSession_create_consumer(mqcrt_MQSession *self, PyObject *args, PyObject *kwds)
{
    mqcrt_MQDestination * dst;
    PyObject * messageSelector;
    ConstMQString selector;
    MQBool noLocal;
    mqcrt_MQConsumer * obj;

    static char *kwlist[] = {"destination", "selector", "nolocal", NULL};

    selector = NULL;
    messageSelector = NULL;
    noLocal = MQ_FALSE;

    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|Oi", kwlist, &dst, &messageSelector, &noLocal)) {
        return NULL;
    }

    if (NULL != messageSelector && Py_None != messageSelector) {
        if (NULL == (messageSelector = coerceObjectToUtf8String(messageSelector))) {
            return NULL;
        }

        selector = PyString_AsString(messageSelector);
    }

    if (NULL == (obj = (mqcrt_MQConsumer *)PyObject_CallObject((PyObject *)&mqcrt_MQConsumerType, NULL))) {
        Py_CLEAR(messageSelector);
        return NULL;
    }

    if (setExceptionFromStatus(MQCreateMessageConsumer(self->session, dst->destination, selector, noLocal, &obj->consumer))) {
        Py_CLEAR(messageSelector);
        Py_CLEAR(obj);
        return NULL;
    }

    Py_CLEAR(messageSelector);
    selector = NULL;

    obj->closed = 0;
    obj->session = (PyObject *)self;
    Py_INCREF(obj->session);
    return (PyObject *)obj;
}


static PyObject * mqcrt_MQSession_create_durable_consumer(mqcrt_MQSession *self, PyObject *args, PyObject *kwds)
{
    mqcrt_MQDestination * dst;
    PyObject * durableName;
    PyObject * messageSelector;
    ConstMQString selector;
    MQBool noLocal;
    mqcrt_MQConsumer * obj;

    static char *kwlist[] = {"destination", "name", "selector", "nolocal", NULL};

    selector = NULL;
    messageSelector = NULL;
    noLocal = MQ_FALSE;

    if (!PyArg_ParseTupleAndKeywords(args, kwds, "OO|Oi", kwlist, &dst, &durableName, &messageSelector, &noLocal)) {
        return NULL;
    }

    if (NULL == (durableName = coerceObjectToUtf8String(durableName))) {
        return NULL;
    }

    if (NULL != messageSelector && Py_None != messageSelector) {
        if (NULL == (messageSelector = coerceObjectToUtf8String(messageSelector))) {
            Py_CLEAR(durableName);
            return NULL;
        }

        selector = PyString_AsString(messageSelector);
    }

    if (NULL == (obj = (mqcrt_MQConsumer *)PyObject_CallObject((PyObject *)&mqcrt_MQConsumerType, NULL))) {
        Py_CLEAR(durableName);
        Py_CLEAR(messageSelector);
        return NULL;
    }

    if (setExceptionFromStatus(MQCreateDurableMessageConsumer(self->session, dst->destination, PyString_AsString(durableName), selector, noLocal, &obj->consumer))) {
        Py_CLEAR(durableName);
        Py_CLEAR(messageSelector);
        Py_CLEAR(obj);
        return NULL;
    }

    Py_CLEAR(durableName);
    Py_CLEAR(messageSelector);
    selector = NULL;

    obj->closed = 0;
    obj->session = (PyObject *)self;
    Py_INCREF(obj->session);
    return (PyObject *)obj;
}


static PyObject * mqcrt_MQSession_create_async_consumer(mqcrt_MQSession *self, PyObject *args, PyObject *kwds)
{
    mqcrt_MQDestination * dst;
    PyObject * messageSelector;
    ConstMQString selector;
    MQBool noLocal;
    PyObject * listener;
    PyObject * listener_data;
    mqcrt_MQConsumer * obj;
    messageListenerData * callback_data;

    static char *kwlist[] = {"destination", "selector", "nolocal", "listener", "listener_data", NULL};

    selector = NULL;
    listener_data = NULL;

    if (!PyArg_ParseTupleAndKeywords(args, kwds, "OOiO|O", kwlist, &dst, &messageSelector, &noLocal, &listener, &listener_data)) {
        return NULL;
    }

    if (!PyCallable_Check(listener)) {
        PyErr_SetString(PyExc_TypeError, "listener must be callable");
        return NULL;
    }

    if (NULL != messageSelector && Py_None != messageSelector) {
        if (NULL == (messageSelector = coerceObjectToUtf8String(messageSelector))) {
            return NULL;
        }

        selector = PyString_AsString(messageSelector);
    }

    if (NULL == (obj = (mqcrt_MQConsumer *)PyObject_CallObject((PyObject *)&mqcrt_MQConsumerType, NULL))) {
        Py_CLEAR(messageSelector);
        return NULL;
    }

    obj->session = (PyObject *)self;
    Py_INCREF(obj->session);

    if (NULL == (obj->callback_data = PyMem_Malloc(sizeof(messageListenerData)))) {
        Py_CLEAR(messageSelector);
        Py_CLEAR(obj);
        return NULL;
    }

    memset(obj->callback_data, 0, sizeof(messageListenerData));
    callback_data = (messageListenerData *)obj->callback_data;

    callback_data->callable = listener;
    Py_XINCREF(callback_data->callable);

    if (NULL != listener_data && Py_None != listener_data) {
        callback_data->args = listener_data;
        Py_XINCREF(callback_data->args);
    }

    callback_data->consumer = obj;
    callback_data = NULL;

    if (setExceptionFromStatus(MQCreateAsyncMessageConsumer(self->session, dst->destination, selector, noLocal, (MQMessageListenerFunc)messageListenerFunc, obj->callback_data, &obj->consumer))) {
        Py_CLEAR(messageSelector);
        Py_CLEAR(obj);
        return NULL;
    }

    Py_CLEAR(messageSelector);
    selector = NULL;

    obj->closed = 0;
    return (PyObject *)obj;
}


static PyObject * mqcrt_MQSession_create_async_durable_consumer(mqcrt_MQSession *self, PyObject *args, PyObject *kwds)
{
    mqcrt_MQDestination * dst;
    PyObject * durableName;
    PyObject * messageSelector;
    ConstMQString name;
    ConstMQString selector;
    MQBool noLocal;
    PyObject * listener;
    PyObject * listener_data;
    mqcrt_MQConsumer * obj;
    messageListenerData * callback_data;

    static char *kwlist[] = {"destination", "name", "selector", "nolocal", "listener", "listener_data", NULL};

    selector = NULL;
    listener_data = NULL;

    if (!PyArg_ParseTupleAndKeywords(args, kwds, "OOOiO|O", kwlist, &dst, &durableName, &messageSelector, &noLocal, &listener, &listener_data)) {
        return NULL;
    }

    if (!PyCallable_Check(listener)) {
        PyErr_SetString(PyExc_TypeError, "listener must be callable");
        return NULL;
    }

    if (NULL == (durableName = coerceObjectToUtf8String(durableName))) {
        return NULL;
    }

    name = PyString_AsString(durableName);

    if (NULL != messageSelector && Py_None != messageSelector) {
        if (NULL == (messageSelector = coerceObjectToUtf8String(messageSelector))) {
            Py_CLEAR(durableName);
            return NULL;
        }

        selector = PyString_AsString(messageSelector);
    }

    if (NULL == (obj = (mqcrt_MQConsumer *)PyObject_CallObject((PyObject *)&mqcrt_MQConsumerType, NULL))) {
        Py_CLEAR(durableName);
        Py_CLEAR(messageSelector);
        return NULL;
    }

    obj->session = (PyObject *)self;
    Py_INCREF(obj->session);

    if (NULL == (obj->callback_data = PyMem_Malloc(sizeof(messageListenerData)))) {
        Py_CLEAR(durableName);
        Py_CLEAR(messageSelector);
        Py_CLEAR(obj);
        return NULL;
    }

    memset(obj->callback_data, 0, sizeof(messageListenerData));
    callback_data = (messageListenerData *)obj->callback_data;

    callback_data->callable = listener;
    Py_XINCREF(callback_data->callable);

    if (NULL != listener_data && Py_None != listener_data) {
        callback_data->args = listener_data;
        Py_XINCREF(callback_data->args);
    }

    callback_data->consumer = obj;
    callback_data = NULL;

    if (setExceptionFromStatus(MQCreateAsyncDurableMessageConsumer(self->session, dst->destination, name, selector, noLocal, (MQMessageListenerFunc)messageListenerFunc, obj->callback_data, &obj->consumer))) {
        Py_CLEAR(durableName);
        Py_CLEAR(messageSelector);
        Py_CLEAR(obj);
        return NULL;
    }

    Py_CLEAR(durableName);
    name = NULL;
    Py_CLEAR(messageSelector);
    selector = NULL;

    obj->closed = 0;
    return (PyObject *)obj;
}


static PyObject * mqcrt_MQSession_unsubscribe_durable_consumer(mqcrt_MQSession *self, PyObject *args, PyObject *kwds)
{
    PyObject * durableName;

    static char *kwlist[] = {"name", NULL};

    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O", kwlist, &durableName)) {
        return NULL;
    }

    if (NULL == (durableName = coerceObjectToUtf8String(durableName))) {
        return NULL;
    }

    if (setExceptionFromStatus(MQUnsubscribeDurableMessageConsumer(self->session, PyString_AsString(durableName)))) {
        Py_CLEAR(durableName);
        return NULL;
    }

    Py_CLEAR(durableName);
    Py_RETURN_NONE;
}


static PyObject * mqcrt_MQSession_recover(mqcrt_MQSession *self, PyObject *args, PyObject *kwds)
{
    if (setExceptionFromStatus(MQRecoverSession(self->session))) {
        return NULL;
    }

    Py_RETURN_NONE;
}


static PyObject * mqcrt_MQSession_commit(mqcrt_MQSession *self, PyObject *args, PyObject *kwds)
{
    if (setExceptionFromStatus(MQCommitSession(self->session))) {
        return NULL;
    }

    Py_RETURN_NONE;
}


static PyObject * mqcrt_MQSession_rollback(mqcrt_MQSession *self, PyObject *args, PyObject *kwds)
{
    if (setExceptionFromStatus(MQRollbackSession(self->session))) {
        return NULL;
    }

    Py_RETURN_NONE;
}


static PyObject * mqcrt_MQSession_acknowledge_mode(mqcrt_MQSession *self, PyObject *args, PyObject *kwds)
{
    MQAckMode ackMode;

    if (setExceptionFromStatus(MQGetAcknowledgeMode(self->session, &ackMode))) {
        return NULL;
    }

    return Py_BuildValue("i", ackMode);
}




static PyMethodDef mqcrt_MQSession_methods[] = {
    {"close", (PyCFunction)mqcrt_MQSession_close, METH_NOARGS, "Closes the session."},
    {"create_producer_for_destination", (PyCFunction)mqcrt_MQSession_create_producer_for_destination, METH_VARARGS | METH_KEYWORDS, "Creates a message producer with a specified destination."},
    {"create_producer", (PyCFunction)mqcrt_MQSession_create_producer, METH_NOARGS, "Creates a message producer with no specified destination."},
    {"create_consumer", (PyCFunction)mqcrt_MQSession_create_consumer, METH_VARARGS | METH_KEYWORDS, "Creates a message consumer with the given properties for synchronous receiving."},
    {"create_durable_consumer", (PyCFunction)mqcrt_MQSession_create_durable_consumer, METH_VARARGS | METH_KEYWORDS, "Creates a durable message consumer with the given properties for synchronous receiving."},
    {"create_async_consumer", (PyCFunction)mqcrt_MQSession_create_async_consumer, METH_VARARGS | METH_KEYWORDS, "Creates a message consumer for asynchronous receiving. The session must be created MQ_SESSION_ASYNC_RECEIVE."},
    {"create_async_durable_consumer", (PyCFunction)mqcrt_MQSession_create_async_durable_consumer, METH_VARARGS | METH_KEYWORDS, "Creates a durable message consumer for asynchronous receiving. The session must be created MQ_SESSION_ASYNC_RECEIVE."},
    {"unsubscribe_durable_consumer", (PyCFunction)mqcrt_MQSession_unsubscribe_durable_consumer, METH_VARARGS | METH_KEYWORDS, "Unsubscribes the durable message consumer with the given durable name.  This deletes all messages at the broker that were being stored for this durable consumer."},
    {"recover", (PyCFunction)mqcrt_MQSession_recover, METH_NOARGS, "Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message."},
    {"commit", (PyCFunction)mqcrt_MQSession_commit, METH_NOARGS, "Commits all messages done in this transaction."},
    {"rollback", (PyCFunction)mqcrt_MQSession_rollback, METH_NOARGS, "Rolls back any messages done in this transaction."},
    {"acknowledge_mode", (PyCFunction)mqcrt_MQSession_acknowledge_mode, METH_NOARGS, "Get session acknowledge mode."},
    {"create_destination", (PyCFunction)mqcrt_MQSession_create_destination, METH_VARARGS | METH_KEYWORDS, "Creates a destination with the given name and type."},
    {"create_temporary_destination", (PyCFunction)mqcrt_MQSession_create_temporary_destination, METH_VARARGS | METH_KEYWORDS, "Creates a temporary destination of the given type."},
    {NULL}
};

static PyTypeObject mqcrt_MQSessionType = {
    PyObject_HEAD_INIT(NULL)
    0,                                       /* ob_size */
    "imq.MQSession",                         /* tp_name */
    sizeof(mqcrt_MQSession),                 /* tp_basicsize */
    0,                                       /* tp_itemsize */
    (destructor)mqcrt_MQSession_dealloc,     /* tp_dealloc */
    0,                                       /* tp_print */
    0,                                       /* tp_getattr */
    0,                                       /* tp_setattr */
    0,                                       /* tp_compare */
    0,                                       /* tp_repr */
    0,                                       /* tp_as_number */
    0,                                       /* tp_as_sequence */
    0,                                       /* tp_as_mapping */
    0,                                       /* tp_hash */
    0,                                       /* tp_call */
    0,                                       /* tp_str */
    0,                                       /* tp_getattro */
    0,                                       /* tp_setattro */
    0,                                       /* tp_as_buffer */
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_GC, /* tp_flags */
    "MQSession objects",                     /* tp_doc */
    (traverseproc)mqcrt_MQSession_traverse,  /* tp_traverse */
    (inquiry)mqcrt_MQSession_clear,          /* tp_clear */
    0,                                       /* tp_richcompare */
    0,                                       /* tp_weaklistoffset */
    0,                                       /* tp_iter */
    0,                                       /* tp_iternext */
    mqcrt_MQSession_methods,                 /* tp_methods */
    0,                                       /* tp_members */
    0,                                       /* tp_getset */
    0,                                       /* tp_base */
    0,                                       /* tp_dict */
    0,                                       /* tp_descr_get */
    0,                                       /* tp_descr_set */
    0,                                       /* tp_dictoffset */
    0,                                       /* tp_init */
    0,                                       /* tp_alloc */
    0,                                       /* tp_new */
};



/* XXX: needs to be fleshed out for XA sessions and their callbacks */
static PyObject * mqcrt_MQSession_Factory(MQSessionHandle handle, mqcrt_MQConnection * connection)
{
    mqcrt_MQSession * sess;

    DPRINTF(("%s\n", "INFO: mqcrt_MQSession_Factory: entry"));

    if (!HANDLE_IS_VALID(handle)) {
        DPRINTF(("%s\n", "INFO: mqcrt_MQSession_Factory: error [invalid session handle]"));
        PyErr_SetString(PyExc_ValueError, "session handle is not valid");
        return NULL;
    }

    if (NULL == connection) {
        DPRINTF(("%s\n", "INFO: mqcrt_MQSession_Factory: error [invalid connection object]"));
        PyErr_SetString(PyExc_ValueError, "connection object is not valid");
        return NULL;
    }

    if (!HANDLE_IS_VALID(connection->connection)) {
        DPRINTF(("%s\n", "INFO: mqcrt_MQSession_Factory: error [invalid connection handle]"));
        PyErr_SetString(PyExc_ValueError, "connection handle is not valid");
        return NULL;
    }

    if (NULL == (sess = (mqcrt_MQSession *)mqcrt_MQSession_new(&mqcrt_MQSessionType, NULL, NULL))) {
        DPRINTF(("%s\n", "INFO: mqcrt_MQSession_Factory: error [allocating new session object]"));
        return NULL;
    }

    COPY_HANDLE(sess->session, handle);
    sess->connection = (PyObject *)connection;
    Py_INCREF(sess->connection);
    sess->closed = 0;

    DPRINTF(("%s\n", "INFO: mqcrt_MQSession_Factory: exit"));
    return (PyObject *)sess;
}

#endif
